1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import static org.junit.Assert.fail;
19  import static org.mockito.Matchers.any;
20  import static org.mockito.Mockito.*;
21  
22  import java.util.Arrays;
23  
24  import org.junit.*;
25  
26  import rx.*;
27  import rx.Observable.OnSubscribe;
28  import rx.functions.Func1;
29  import rx.observers.TestSubscriber;
30  import rx.subjects.*;
31  
32  public class OperatorTakeWhileTest {
33  
34      @Test
35      public void testTakeWhile1() {
36          Observable<Integer> w = Observable.just(1, 2, 3);
37          Observable<Integer> take = w.takeWhile(new Func1<Integer, Boolean>() {
38              @Override
39              public Boolean call(Integer input) {
40                  return input < 3;
41              }
42          });
43  
44          @SuppressWarnings("unchecked")
45          Observer<Integer> observer = mock(Observer.class);
46          take.subscribe(observer);
47          verify(observer, times(1)).onNext(1);
48          verify(observer, times(1)).onNext(2);
49          verify(observer, never()).onNext(3);
50          verify(observer, never()).onError(any(Throwable.class));
51          verify(observer, times(1)).onCompleted();
52      }
53  
54      @Test
55      public void testTakeWhileOnSubject1() {
56          Subject<Integer, Integer> s = PublishSubject.create();
57          Observable<Integer> take = s.takeWhile(new Func1<Integer, Boolean>() {
58              @Override
59              public Boolean call(Integer input) {
60                  return input < 3;
61              }
62          });
63  
64          @SuppressWarnings("unchecked")
65          Observer<Integer> observer = mock(Observer.class);
66          take.subscribe(observer);
67  
68          s.onNext(1);
69          s.onNext(2);
70          s.onNext(3);
71          s.onNext(4);
72          s.onNext(5);
73          s.onCompleted();
74  
75          verify(observer, times(1)).onNext(1);
76          verify(observer, times(1)).onNext(2);
77          verify(observer, never()).onNext(3);
78          verify(observer, never()).onNext(4);
79          verify(observer, never()).onNext(5);
80          verify(observer, never()).onError(any(Throwable.class));
81          verify(observer, times(1)).onCompleted();
82      }
83  
84      @Test
85      public void testTakeWhile2() {
86          Observable<String> w = Observable.just("one", "two", "three");
87          Observable<String> take = w.takeWhile(new Func1<String, Boolean>() {
88              int index = 0;
89  
90              @Override
91              public Boolean call(String input) {
92                  return index++ < 2;
93              }
94          });
95  
96          @SuppressWarnings("unchecked")
97          Observer<String> observer = mock(Observer.class);
98          take.subscribe(observer);
99          verify(observer, times(1)).onNext("one");
100         verify(observer, times(1)).onNext("two");
101         verify(observer, never()).onNext("three");
102         verify(observer, never()).onError(any(Throwable.class));
103         verify(observer, times(1)).onCompleted();
104     }
105 
106     @Test
107     public void testTakeWhileDoesntLeakErrors() {
108         Observable<String> source = Observable.create(new OnSubscribe<String>() {
109             @Override
110             public void call(Subscriber<? super String> observer) {
111                 observer.onNext("one");
112                 observer.onError(new Throwable("test failed"));
113             }
114         });
115 
116         source.takeWhile(new Func1<String, Boolean>() {
117             @Override
118             public Boolean call(String s) {
119                 return false;
120             }
121         }).toBlocking().lastOrDefault("");
122     }
123 
124     @Test
125     public void testTakeWhileProtectsPredicateCall() {
126         TestObservable source = new TestObservable(mock(Subscription.class), "one");
127         final RuntimeException testException = new RuntimeException("test exception");
128 
129         @SuppressWarnings("unchecked")
130         Observer<String> observer = mock(Observer.class);
131         Observable<String> take = Observable.create(source).takeWhile(new Func1<String, Boolean>() {
132             @Override
133             public Boolean call(String s) {
134                 throw testException;
135             }
136         });
137         take.subscribe(observer);
138 
139         // wait for the Observable to complete
140         try {
141             source.t.join();
142         } catch (Throwable e) {
143             e.printStackTrace();
144             fail(e.getMessage());
145         }
146 
147         verify(observer, never()).onNext(any(String.class));
148         verify(observer, times(1)).onError(testException);
149     }
150 
151     @Test
152     public void testUnsubscribeAfterTake() {
153         Subscription s = mock(Subscription.class);
154         TestObservable w = new TestObservable(s, "one", "two", "three");
155 
156         @SuppressWarnings("unchecked")
157         Observer<String> observer = mock(Observer.class);
158         Observable<String> take = Observable.create(w).takeWhile(new Func1<String, Boolean>() {
159             int index = 0;
160 
161             @Override
162             public Boolean call(String s) {
163                 return index++ < 1;
164             }
165         });
166         take.subscribe(observer);
167 
168         // wait for the Observable to complete
169         try {
170             w.t.join();
171         } catch (Throwable e) {
172             e.printStackTrace();
173             fail(e.getMessage());
174         }
175 
176         System.out.println("TestObservable thread finished");
177         verify(observer, times(1)).onNext("one");
178         verify(observer, never()).onNext("two");
179         verify(observer, never()).onNext("three");
180         verify(s, times(1)).unsubscribe();
181     }
182 
183     private static class TestObservable implements Observable.OnSubscribe<String> {
184 
185         final Subscription s;
186         final String[] values;
187         Thread t = null;
188 
189         public TestObservable(Subscription s, String... values) {
190             this.s = s;
191             this.values = values;
192         }
193 
194         @Override
195         public void call(final Subscriber<? super String> observer) {
196             System.out.println("TestObservable subscribed to ...");
197             observer.add(s);
198             t = new Thread(new Runnable() {
199 
200                 @Override
201                 public void run() {
202                     try {
203                         System.out.println("running TestObservable thread");
204                         for (String s : values) {
205                             System.out.println("TestObservable onNext: " + s);
206                             observer.onNext(s);
207                         }
208                         observer.onCompleted();
209                     } catch (Throwable e) {
210                         throw new RuntimeException(e);
211                     }
212                 }
213 
214             });
215             System.out.println("starting TestObservable thread");
216             t.start();
217             System.out.println("done starting TestObservable thread");
218         }
219     }
220     
221     @Test
222     public void testBackpressure() {
223         Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Func1<Integer, Boolean>() {
224             @Override
225             public Boolean call(Integer t1) {
226                 return t1 < 100;
227             }
228         });
229         TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
230             @Override
231             public void onStart() {
232                 request(5);
233             }
234         };
235         
236         source.subscribe(ts);
237         
238         ts.assertNoErrors();
239         ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
240         
241         ts.requestMore(5);
242 
243         ts.assertNoErrors();
244         ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
245     }
246     
247     @Test
248     public void testNoUnsubscribeDownstream() {
249         Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Func1<Integer, Boolean>() {
250             @Override
251             public Boolean call(Integer t1) {
252                 return t1 < 2;
253             }
254         });
255         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
256         
257         source.unsafeSubscribe(ts);
258         
259         ts.assertNoErrors();
260         ts.assertReceivedOnNext(Arrays.asList(1));
261         
262         Assert.assertFalse("Unsubscribed!", ts.isUnsubscribed());
263     }
264 }